Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low code): Add GroupingPartitionRouter #354

Open
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

tolik0
Copy link
Contributor

@tolik0 tolik0 commented Feb 20, 2025

Implements: https://github.com/airbytehq/airbyte-internal-issues/issues/11622

Summary by CodeRabbit

  • New Features

    • Introduced a new GroupingPartitionRouter for enhanced partition grouping functionality, allowing for batching of partitions based on a specified size.
    • Added an option to automatically remove duplicate partition entries within each batch.
    • Updated SimpleRetriever and AsyncRetriever components to support the new grouping functionality, enabling simultaneous processing of multiple partitions in API requests.
  • Tests

    • Added a comprehensive suite of unit tests for the GroupingPartitionRouter, covering various scenarios including grouping behavior, lazy iteration, and handling of extra fields.
    • Enhanced test coverage for the creation of GroupingPartitionRouter with an underlying router and validation of group size.

@github-actions github-actions bot added the enhancement New feature or request label Feb 20, 2025
Copy link
Contributor

coderabbitai bot commented Feb 20, 2025

📝 Walkthrough

Walkthrough

This pull request introduces a new component called GroupingPartitionRouter for declarative data extraction. It allows batches of partition keys to be grouped into slices with a configurable size and optional deduplication. The changes span YAML schema updates, new and modified Python classes in models, a factory method for component instantiation, and export adjustments to expose the new router. Existing retriever components are updated to support the new grouping functionality.

Changes

File(s) Change Summary
airbyte_cdk/.../declarative_component_schema.yaml
airbyte_cdk/.../models/declarative_component_schema.py
Added new GroupingPartitionRouter definitions. Updated SimpleRetriever and AsyncRetriever to include it as a valid partition router. Also added required type imports.
airbyte_cdk/.../parsers/model_to_component_factory.py Introduced the create_grouping_partition_router method to create and validate a GroupingPartitionRouter from a model and configuration.
airbyte_cdk/.../partition_routers/__init__.py
airbyte_cdk/.../partition_routers/grouping_partition_router.py
Exported and implemented the GroupingPartitionRouter. The implementation groups partitions into batches based on a set group size and applies deduplication when enabled.
unit_tests/.../partition_routers/test_grouping_partition_router.py
unit_tests/.../parsers/test_model_to_component_factory.py
Added unit tests for the GroupingPartitionRouter functionality and its integration with the factory method.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Factory as ModelToComponentFactory
    participant Underlying as PartitionRouter
    participant Grouping as GroupingPartitionRouter

    Client->>Factory: create_grouping_partition_router(model, config)
    Factory->>Factory: _create_component_from_model(model)
    Factory-->>Underlying: Instantiate underlying partition router
    Factory->>Factory: Validate instance is a PartitionRouter
    alt Valid underlying router
       Factory->>Grouping: Initialize GroupingPartitionRouter with parameters
       Grouping-->>Factory: GroupingPartitionRouter instance created
    else Invalid underlying router
       Factory-->>Client: Raise ValueError
    end
    Factory-->>Client: Return GroupingPartitionRouter instance
Loading
sequenceDiagram
    participant Grouping as GroupingPartitionRouter
    participant Underlying as UnderlyingPartitionRouter
    participant Client as DataConsumer

    Client->>Grouping: Request stream slices
    Grouping->>Underlying: Call stream_slices()
    loop Processing partitions
       Underlying-->>Grouping: Return partition slice
       Grouping->>Grouping: Buffer slice (apply deduplication if enabled)
       alt Batch size reached or stream end
          Grouping-->>Client: Yield grouped slice batch
       end
    end
Loading

Possibly Related PRs

Suggested Reviewers

  • maxi297 – Would you be open to reviewing these changes? wdyt?
  • aldogonzalez8 – Could you take a look at this as well?

Tip

CodeRabbit's docstrings feature is now available as part of our Pro Plan! Simply use the command @coderabbitai generate docstrings to have CodeRabbit automatically generate docstrings for your pull request. We would love to hear your feedback on Discord.

✨ Finishing Touches
  • 📝 Generate Docstrings

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py (1)

12-24: Nitpick: Clarify mention of SinglePartitionRouter in the docstring.

In the docstring (line 20), the router is referred to as a SinglePartitionRouter, but the code uses PartitionRouter. Might be worth updating to maintain consistency and reduce confusion, wdyt?

airbyte_cdk/sources/declarative/declarative_component_schema.yaml (2)

3133-3141: Integration in SimpleRetriever: Inclusion of GroupingPartitionRouter
It looks like you added references to the new GroupingPartitionRouter inside the partition_router allowed types for SimpleRetriever. This should enable users to choose the grouping strategy for partitioning. Have you validated that the ordering and overall schema constraints are still correct here? wdyt?


3295-3302: Integration in AsyncRetriever: Inclusion of GroupingPartitionRouter
Similar to the SimpleRetriever, the AsyncRetriever now accepts GroupingPartitionRouter in its partition_router field. This is a great step toward consistency. Would you mind confirming that the asynchronous flow correctly processes grouped partitions as expected in practice? wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 978be1b and 96ee457.

📒 Files selected for processing (5)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (3 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (4 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3 hunks)
  • airbyte_cdk/sources/declarative/partition_routers/__init__.py (2 hunks)
  • airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py (1 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py

[error] 43-43: Need type annotation for 'seen_keys'


[error] 51-51: Unsupported right operand type for in ('set[Any] | None')


[error] 57-57: Item 'None' of 'set[Any] | None' has no attribute 'add'

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[error] 3056-3056: Argument 'model' to '_create_component_from_model' of 'ModelToComponentFactory' has incompatible type 'CustomPartitionRouter | ListPartitionRouter | SubstreamPartitionRouter | None'; expected 'BaseModel'


[error] 3064-3064: Unexpected keyword argument 'parameters' for 'GroupingPartitionRouter'

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

[error] 2376-2376: Invalid type comment or annotation. Suggestion: use conint[...] instead of conint(...)

⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Analyze (python)
🔇 Additional comments (3)
airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py (1)

68-117: LGTM for request/response methods.

The request/response components and their delegations appear solid and maintainable. wdyt?

airbyte_cdk/sources/declarative/partition_routers/__init__.py (1)

11-13: Looks good!

Exporting GroupingPartitionRouter here cleanly integrates it into your module’s public API, wdyt?

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

230-232: LGTM! Import follows the established pattern.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

3052-3070: ⚠️ Potential issue

Fix parameter handling in GroupingPartitionRouter instantiation.

The pipeline failure indicates that parameters is not a valid argument for GroupingPartitionRouter. Looking at the implementation, we should remove this argument.

What do you think about this fix?

     return GroupingPartitionRouter(
         group_size=model.group_size,
         underlying_partition_router=underlying_router,
         deduplicate=model.deduplicate if model.deduplicate is not None else True,
-        parameters=model.parameters or {},
         config=config,
     )
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)

2390-2411: 🛠️ Refactor suggestion

The GroupingPartitionRouter schema looks good, but should we update the type annotation?

The implementation looks solid with well-documented fields. However, there's a potential issue with the conint type annotation that might cause problems. Would you like to update it to use bracket notation instead?

-    group_size: conint(ge=1) = Field(
+    group_size: conint[ge=1] = Field(
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

3055-3057: Add type safety check for underlying_partition_router.

To prevent potential runtime errors, we should add a type check for the underlying_partition_router model.

What do you think about adding this type check?

+    if not isinstance(model.underlying_partition_router, BaseModel):
+        raise ValueError(
+            f"underlying_partition_router must be a BaseModel instance, got {type(model.underlying_partition_router)}"
+        )
+
     underlying_router = self._create_component_from_model(
         model=model.underlying_partition_router, config=config
     )
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 96ee457 and b35a165.

📒 Files selected for processing (5)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (3 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (11 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3 hunks)
  • airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py (1 hunks)
  • unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/models/declarative_component_schema.py

[error] 1-1: Ruff formatting check failed. 1 file would be reformatted. Run 'ruff format' to fix code style issues in this file.

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[error] 3086-3086: Unexpected keyword argument 'parameters' for 'GroupingPartitionRouter'

🪛 GitHub Actions: Pytest (Fast)
unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py

[error] 9-9: ModuleNotFoundError: No module named 'test_substream_partition_router'.

⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Analyze (python)
🔇 Additional comments (8)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)

3419-3458: Friendly Check on the New GroupingPartitionRouter Component

You've introduced the new GroupingPartitionRouter with clear required fields such as type, group_size, and underlying_partition_router, and added an optional deduplicate flag with a sensible default of true. I really appreciate how the description transparently explains that grouping may affect per-partition incremental syncs—this thoughtful note will help users understand potential trade-offs.

A couple of questions to ensure we’re covering all bases:

  • Would you consider expanding the description of deduplicate to detail how duplicate partitions are identified or handled? This might further assist users configuring the component.
  • Is the naming of underlying_partition_router consistent with related schemas elsewhere and aligned with our overall naming conventions? It looks correct here, but it might be good to verify this across the codebase.

Overall, this component appears well thought out. wdyt?

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)

230-232: LGTM!

The import of GroupingPartitionRouterModel is correctly placed in alphabetical order with other model imports.


385-385: LGTM!

The import of GroupingPartitionRouter is correctly placed in alphabetical order with other router imports.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)

2242-2250: LGTM! The partition router integration is well-structured.

The GroupingPartitionRouter is properly integrated into the list of available partition routers for both SimpleRetriever and AsyncRetriever.

unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py (4)

89-171: LGTM! Comprehensive test coverage for grouping behavior.

The parametrized test cases effectively cover different group sizes and deduplication scenarios. The expected results are well-defined and the assertions are clear.


203-299: LGTM! Excellent lazy iteration testing.

The test effectively verifies that the GroupingPartitionRouter processes partitions lazily using a controlled iterator. The assertions at different stages provide good confidence in the implementation.


330-375: LGTM! Good edge case coverage for extra fields.

The test properly handles scenarios with missing fields, ensuring the router can handle varied data structures gracefully.


419-446: LGTM! Good test for non-empty parent slices.

The test ensures that parent slice information is properly preserved during grouping operations.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (2)
airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py (2)

42-44: 🛠️ Refactor suggestion

Fix type safety in deduplication logic.

Based on past review comments, there are type safety issues in the deduplication logic. Would you consider this approach to improve type safety, wdyt?

-        batch = []
-        seen_keys = set()
+        from typing import List, Set, Any
+        batch: List[StreamSlice] = []
+        seen_keys: Set[Any] = set()

42-44: ⚠️ Potential issue

Fix type safety issues in deduplication logic.

The seen_keys variable needs type annotation to prevent type checker issues. Additionally, consider handling the case where partitions might be empty.

What do you think about these changes?

-        batch = []
-        seen_keys = set()
+        from typing import Set, Any
+        batch: list[StreamSlice] = []
+        seen_keys: Set[Any] = set()
🧹 Nitpick comments (8)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)

3849-3903: LGTM! The test is well-structured and comprehensive.

The test effectively validates:

  • Creation of GroupingPartitionRouter with correct group size
  • Proper configuration of the underlying SubstreamPartitionRouter
  • Parent stream configuration and field mappings

One suggestion: Would you consider adding a test case for deduplication behavior since GroupingPartitionRouter supports it? wdyt?

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)

3053-3072: Add type safety checks for the underlying partition router.

The current implementation could be improved with better type safety. Would you consider adding type checks for model.underlying_partition_router before passing it to _create_component_from_model? This would help catch type-related issues earlier. For example:

 def create_grouping_partition_router(
     self, model: GroupingPartitionRouterModel, config: Config, **kwargs: Any
 ) -> GroupingPartitionRouter:
+    if not isinstance(model.underlying_partition_router, BaseModel):
+        raise ValueError(
+            f"underlying_partition_router must be a BaseModel instance, got {type(model.underlying_partition_router)}"
+        )
+
     underlying_router = self._create_component_from_model(
         model=model.underlying_partition_router, config=config
     )

wdyt?


3053-3072: Consider adding type hints for better type safety.

The implementation could benefit from explicit type hints. Would you consider these changes to improve type safety, wdyt?

    def create_grouping_partition_router(
        self, model: GroupingPartitionRouterModel, config: Config, **kwargs: Any
    ) -> GroupingPartitionRouter:
+       from typing import cast
        underlying_router = self._create_component_from_model(
            model=model.underlying_partition_router, config=config
        )
        if model.group_size < 1:
            raise ValueError(f"Group size must be greater than 0, got {model.group_size}")

        if not isinstance(underlying_router, PartitionRouter):
            raise ValueError(
                f"Underlying partition router must be a PartitionRouter subclass, got {type(underlying_router)}"
            )

        return GroupingPartitionRouter(
            group_size=model.group_size,
            underlying_partition_router=cast(PartitionRouter, underlying_router),
            deduplicate=model.deduplicate if model.deduplicate is not None else True,
            config=config,
        )
airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py (4)

31-67: Consider adding error handling for empty partitions.

The current implementation assumes that partitions will always have values. Would you consider adding error handling for empty partitions? For example:

 def stream_slices(self) -> Iterable[StreamSlice]:
     batch = []
     seen_keys = set()

     for partition in self.underlying_partition_router.stream_slices():
+        if not partition.partition:
+            continue
+
         key = next(iter(partition.partition.values()), None)

Also, consider adding a docstring note about how empty partitions are handled, wdyt?


68-96: Consider handling edge cases in _create_grouped_slice.

The method assumes that all partitions in a batch have the same keys. Would you consider adding validation or handling for cases where partitions might have different keys? For example:

 def _create_grouped_slice(self, batch: list[StreamSlice]) -> StreamSlice:
+    if not batch:
+        raise ValueError("Cannot create a grouped slice from an empty batch")
+
     grouped_partition = {
-        key: [p.partition.get(key) for p in batch] for key in batch[0].partition.keys()
+        key: [p.partition.get(key) for p in batch] 
+        for key in set().union(*(p.partition.keys() for p in batch))
     }

wdyt?


68-97: Consider improving type hints in _create_grouped_slice method.

The method could benefit from more explicit type hints. Would you consider these changes, wdyt?

-    def _create_grouped_slice(self, batch: list[StreamSlice]) -> StreamSlice:
+    def _create_grouped_slice(self, batch: List[StreamSlice]) -> StreamSlice:
         """
         Creates a grouped StreamSlice from a batch of partitions, aggregating extra fields into a dictionary with list values.
 
         Args:
-            batch (list[StreamSlice]): A list of StreamSlice objects to group.
+            batch (List[StreamSlice]): A list of StreamSlice objects to group.
 
         Returns:
             StreamSlice: A single StreamSlice with combined partition and extra field values.
         """
         # Combine partition values into a single dict with lists
+        from typing import Dict
+        grouped_partition: Dict[str, list] = {
-        grouped_partition = {
             key: [p.partition.get(key) for p in batch] for key in batch[0].partition.keys()
         }
 
         # Aggregate extra fields into a dict with list values
-        extra_fields_dict = (
+        extra_fields_dict: Dict[str, list] = (

47-52: Consider adding error handling for empty partitions.

The code assumes that partition values exist. Would you consider adding validation to handle empty partitions gracefully, wdyt?

         # Extract the partition key (assuming single key-value pair, e.g., {"board_ids": value})
+        if not partition.partition:
+            continue
         key = next(iter(partition.partition.values()), None)
+        if key is None:
+            continue
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)

3420-3457: Review the GroupingPartitionRouter schema definition:

  • The new GroupingPartitionRouter component is clearly defined with its key properties: type, group_size, and underlying_partition_router. The renaming to underlying_partition_router nicely addresses the previous naming consistency comment.
  • The description is comprehensive, explaining both the use case (grouping partitions for APIs supporting multiple keys in one request) and a potential pitfall with incremental syncs; this extra context is really helpful.
  • The default value for the deduplicate flag is set to true, which seems sensible for ensuring unique partitions. Is this the intended behavior in most cases? Perhaps adding a brief note on why duplications are removed by default could further guide users.
  • Would you consider providing a small example configuration snippet in the documentation? It might help users better understand how to integrate this new component into their stream settings.
    wdyt?
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cf782a2 and 6cb895e.

📒 Files selected for processing (6)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (3 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4 hunks)
  • airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py (1 hunks)
  • unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (3 hunks)
  • unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py
👮 Files not reviewed due to content moderation or server errors (2)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
  • airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Analyze (python)
🔇 Additional comments (5)
airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py (3)

1-30: LGTM! Well-documented class with clear attributes.

The class is well-structured with comprehensive docstrings explaining its purpose and attributes. The use of dataclass and type hints is appropriate.


98-137: LGTM! Clean implementation of request methods and state management.

The request methods return empty mappings as expected, and state management is properly delegated to the underlying router.


130-136: LGTM! Clean state management delegation.

The implementation correctly delegates state management to the underlying partition router.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)

2374-2394: Consider adding validation for group_size.

The group_size field should be positive to ensure valid grouping. Based on the pipeline error, would you like to update it using bracket notation like this?

-    group_size: int = Field(
+    group_size: conint[ge=1] = Field(

Also, consider adding more descriptive examples in the docstring to show how the router can be used with different underlying routers, wdyt?


2228-2237: LGTM! The partition_router field updates look good.

The changes maintain backward compatibility and consistency between SimpleRetriever and AsyncRetriever classes.

Also applies to: 2314-2323

@tolik0 tolik0 requested a review from maxi297 February 24, 2025 17:54
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

3053-3079: Consider adding type hints for the underlying_partition_router parameter.

The implementation looks solid with proper validation checks. However, we could improve type safety by adding type hints for the underlying_partition_router parameter in the model. What do you think about this change?

-        underlying_router = self._create_component_from_model(
-            model=model.underlying_partition_router, config=config
+        underlying_router: PartitionRouter = self._create_component_from_model(
+            model=model.underlying_partition_router, config=config
         )

Verify the behavior of deduplicate parameter.

The deduplicate parameter defaults to True if not specified. Could you confirm if this is the intended default behavior? Also, should we document this default value in the model's schema?

Consider adding validation for parent stream configurations.

The code checks for request options in parent stream configs but doesn't validate other potential issues. Should we add more comprehensive validation? For example:

  1. Validate that parent streams don't have their own GroupingPartitionRouter to prevent nesting
  2. Check for any other incompatible configurations
    wdyt?
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)

3849-3999: Consider adding test cases for edge scenarios?

The current test coverage looks good but we could potentially add tests for:

  • Deduplication behavior
  • Different group sizes
  • Multiple parent streams
  • Empty parent stream results

WDYT about adding these test cases to make the test coverage more comprehensive?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6cb895e and fd9b225.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4 hunks)
  • unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (6)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)

230-232: LGTM! Clean import of the new GroupingPartitionRouter model.

The import is correctly placed in alphabetical order with other model imports.


631-631: LGTM! Factory mapping registration looks good.

The new component is properly registered in the PYDANTIC_MODEL_TO_CONSTRUCTOR mapping.

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (4)

68-70: LGTM! Clean import addition.

The GroupingPartitionRouter import is properly grouped with other similar imports.


3849-3896: LGTM! Well-structured test for GroupingPartitionRouter creation.

The test thoroughly validates:

  • Successful creation of GroupingPartitionRouter with underlying SubstreamPartitionRouter
  • Correct group_size configuration
  • Proper setup of parent stream configs

3905-3949: LGTM! Good negative test case.

The test properly validates that an invalid group_size raises a ValueError with a descriptive message.


3951-3999: LGTM! Good validation test.

The test ensures that using request_option with GroupingPartitionRouter raises an appropriate error.

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a couple of questions just to clarify my understanding

if model.group_size < 1:
raise ValueError(f"Group size must be greater than 0, got {model.group_size}")

if not isinstance(underlying_router, PartitionRouter):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case can this happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, we explicitly define the underlying partition router class in the schema, so this check is unnecessary. I'll remove it.


def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
"""Delegate state retrieval to the underlying partition router."""
return self.underlying_partition_router.get_stream_state()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is interesting to me because it feels like we have a gap between the state of the parent and what is actually emitted and I'm not sure if this is fine. Let me explain:

. For example, given we have group_size = 2, we could have the following situation:

  • T0: Parent emit parent_1, state is parent_state_1, no slices created for the child stream yet
  • T1: Parent emit parent_2, state is parent_state_2, we emit a slice to the child

If we were to request the state between T0 and T1, the state would actually be wrong because we haven't consumed parent_1 from the child's perspective. I don't know if we have a process that does that. I could easily see that if we fail on the parent stream between T0 and T1, the sync would stop and maybe at that point we would update the state. However, I don't know if it is actually the case. Do we have something like that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. I'll update the GroupingPartitionRouter to return the state for the last emitted parent record to ensure state consistency.


# Iterate over partitions lazily from the underlying router
for partition in self.underlying_partition_router.stream_slices():
# Extract the partition key (assuming single key-value pair, e.g., {"board_ids": value})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we assume this? Couldn't we group objects like {"key1": "value1", "key2": "value2"}?

To be clear, I'm fine with not supporting that as part of the first iteration but I think we would need to:

  • Have clear error message when we have many values
  • Check quickly that when we will want to support objects there will be a not too annoying way of making this not be a breaking change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both ListPartitionRouter and SubstreamPartitionRouter return partitions as dictionaries with a single key. If a partition router were to return multiple keys, we couldn't guarantee that all partition keys would be consistently present in every partition. Some keys might be missing in the first batch and appear in the next one, making grouping unreliable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm good with that. Does that means that there should be an error if the partition router has multiple values? I would prefer to fail loudly especially since we allow for CustomPartitionRouter

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

3053-3082: Implementation looks good, but could we enhance the error messages?

The implementation properly validates group size and checks for unsupported configurations. I particularly like the comment explaining why request options aren't supported for GroupingPartitionRouter.

Just one suggestion - the error messages on lines 3071 and 3075 could be more descriptive to help users understand why request options don't work with grouped partitions, wdyt?

-                raise ValueError("Request options are not supported for GroupingPartitionRouter.")
+                raise ValueError("Request options are not supported for GroupingPartitionRouter because they are specific to individual partitions and can't be properly applied when partitions are grouped.")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fd9b225 and 1a1d407.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (6)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (6)

230-232: Good addition of the required import for the new router model.

The import of GroupingPartitionRouter model follows the established pattern in this file, which helps maintain consistency.


385-385: LGTM on adding GroupingPartitionRouter to the imports.

This follows the pattern of importing all partition router implementations in this section.


631-631: Correctly added the model to constructor mapping.

This is required for the factory pattern to work with the new router type.


3059-3060: The group_size validation looks good.

As discussed in the previous comments, keeping the min value at 1 makes sense because even a group_size of 1 can be useful for deduplication purposes.


3063-3076: Well thought-out validation of underlying router configurations.

The validation logic checks for request options in both SubstreamPartitionRouter and ListPartitionRouter, which prevents users from creating invalid configurations. This is a proactive approach to ensure proper component usage.


3077-3082: Constructor call looks good.

The creation of the GroupingPartitionRouter instance includes all the necessary parameters, and the default for deduplicate (True) makes sense as a sensible default.

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚢

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants